package com.nulldev.util.internal.backport.httpclient_rw;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import com.nulldev.util.internal.backport.concurrency9.Lists;
import com.nulldev.util.internal.backport.concurrency9.Objects;
import com.nulldev.util.internal.backport.concurrency9.concurrent.Flow;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.BufferSupplier;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Demand;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.FlowTube;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Log;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Logger;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.SequentialScheduler;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.SequentialScheduler.DeferredCompleter;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.SequentialScheduler.RestartableTask;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Utils;

/**
 * A SocketTube is a terminal tube plugged directly into the socket. The read
 * subscriber should call {@code subscribe} on the SocketTube before the
 * SocketTube is subscribed to the write publisher.
 */
final class SocketTube implements FlowTube {

	final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
	static final AtomicLong IDS = new AtomicLong();

	private final HttpClientImpl client;
	private final SocketChannel channel;
	private final SliceBufferSource sliceBuffersSource;
	private final Object lock = new Object();
	private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
	private final InternalReadPublisher readPublisher;
	private final InternalWriteSubscriber writeSubscriber;
	private final long id = IDS.incrementAndGet();

	public SocketTube(HttpClientImpl client, SocketChannel channel, Supplier<ByteBuffer> buffersFactory) {
		this.client = client;
		this.channel = channel;
		this.sliceBuffersSource = new SliceBufferSource(buffersFactory);

		this.readPublisher = new InternalReadPublisher();
		this.writeSubscriber = new InternalWriteSubscriber();
	}

	/**
	 * Returns {@code true} if this flow is finished. This happens when this flow
	 * internal read subscription is completed, either normally (EOF reading) or
	 * exceptionally (EOF writing, or underlying socket closed, or some exception
	 * occurred while reading or writing to the socket).
	 *
	 * @return {@code true} if this flow is finished.
	 */
	public boolean isFinished() {
		InternalReadPublisher.InternalReadSubscription subscription = readPublisher.subscriptionImpl;
		return subscription != null && subscription.completed || subscription == null && errorRef.get() != null;
	}

	// ===================================================================== //
	// Flow.Publisher //
	// ======================================================================//

	/**
	 * {@inheritDoc }
	 * 
	 * @apiNote This method should be called first. In particular, the caller must
	 *          ensure that this method must be called by the read subscriber before
	 *          the write publisher can call {@code onSubscribe}. Failure to adhere
	 *          to this contract may result in assertion errors.
	 */
	@Override
	public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
		Objects.requireNonNull(s);
		assert s instanceof TubeSubscriber : "Expected TubeSubscriber, got:" + s;
		readPublisher.subscribe(s);
	}

	// ===================================================================== //
	// Flow.Subscriber //
	// ======================================================================//

	/**
	 * {@inheritDoc }
	 * 
	 * @apiNote The caller must ensure that {@code subscribe} is called by the read
	 *          subscriber before {@code onSubscribe} is called by the write
	 *          publisher. Failure to adhere to this contract may result in
	 *          assertion errors.
	 */
	@Override
	public void onSubscribe(Flow.Subscription subscription) {
		writeSubscriber.onSubscribe(subscription);
	}

	@Override
	public void onNext(List<ByteBuffer> item) {
		writeSubscriber.onNext(item);
	}

	@Override
	public void onError(Throwable throwable) {
		writeSubscriber.onError(throwable);
	}

	@Override
	public void onComplete() {
		writeSubscriber.onComplete();
	}

	// ===================================================================== //
	// Events //
	// ======================================================================//

	void signalClosed() {
		// Ensures that the subscriber will be terminated and that future
		// subscribers will be notified when the connection is closed.
		if (Log.channel()) {
			Log.logChannel("Connection close signalled: connection closed locally ({0})", channelDescr());
		}
		readPublisher.subscriptionImpl.signalError(new IOException("connection closed locally"));
	}

	/**
	 * A restartable task used to process tasks in sequence.
	 */
	private static class SocketFlowTask implements RestartableTask {
		final Runnable task;
		private final Object monitor = new Object();

		SocketFlowTask(Runnable task) {
			this.task = task;
		}

		@Override
		public final void run(DeferredCompleter taskCompleter) {
			try {
				// non contentious synchronized for visibility.
				synchronized (monitor) {
					task.run();
				}
			} finally {
				taskCompleter.complete();
			}
		}
	}

	// This is best effort - there's no guarantee that the printed set of values
	// is consistent. It should only be considered as weakly accurate - in
	// particular in what concerns the events states, especially when displaying
	// a read event state from a write event callback and conversely.
	void debugState(String when) {
		if (debug.on()) {
			StringBuilder state = new StringBuilder();

			InternalReadPublisher.InternalReadSubscription sub = readPublisher.subscriptionImpl;
			InternalReadPublisher.ReadEvent readEvent = sub == null ? null : sub.readEvent;
			Demand rdemand = sub == null ? null : sub.demand;
			InternalWriteSubscriber.WriteEvent writeEvent = writeSubscriber.writeEvent;
			Demand wdemand = writeSubscriber.writeDemand;
			int rops = readEvent == null ? 0 : readEvent.interestOps();
			long rd = rdemand == null ? 0 : rdemand.get();
			int wops = writeEvent == null ? 0 : writeEvent.interestOps();
			long wd = wdemand == null ? 0 : wdemand.get();

			state.append(when).append(" Reading: [ops=").append(rops).append(", demand=").append(rd).append(", stopped=")
					.append((sub == null ? false : sub.readScheduler.isStopped())).append("], Writing: [ops=").append(wops).append(", demand=").append(wd)
					.append("]");
			debug.log(state.toString());
		}
	}

	/**
	 * A repeatable event that can be paused or resumed by changing its interestOps.
	 * When the event is fired, it is first paused before being signaled. It is the
	 * responsibility of the code triggered by {@code signalEvent} to resume the
	 * event if required.
	 */
	private static abstract class SocketFlowEvent extends AsyncEvent {
		final SocketChannel channel;
		final int defaultInterest;
		volatile int interestOps;
		volatile boolean registered;

		SocketFlowEvent(int defaultInterest, SocketChannel channel) {
			super(AsyncEvent.REPEATING);
			this.defaultInterest = defaultInterest;
			this.channel = channel;
		}

		final boolean registered() {
			return registered;
		}

		final void resume() {
			interestOps = defaultInterest;
			registered = true;
		}

		final void pause() {
			interestOps = 0;
		}

		@Override
		public final SelectableChannel channel() {
			return channel;
		}

		@Override
		public final int interestOps() {
			return interestOps;
		}

		@Override
		public final void handle() {
			pause(); // pause, then signal
			signalEvent(); // won't be fired again until resumed.
		}

		@Override
		public final void abort(IOException error) {
			debug().log(() -> "abort: " + error);
			pause(); // pause, then signal
			signalError(error); // should not be resumed after abort (not checked)
		}

		protected abstract void signalEvent();

		protected abstract void signalError(Throwable error);

		abstract Logger debug();
	}

	// ===================================================================== //
	// Writing //
	// ======================================================================//

	// This class makes the assumption that the publisher will call onNext
	// sequentially, and that onNext won't be called if the demand has not been
	// incremented by request(1).
	// It has a 'queue of 1' meaning that it will call request(1) in
	// onSubscribe, and then only after its 'current' buffer list has been
	// fully written and current set to null;
	private final class InternalWriteSubscriber implements Flow.Subscriber<List<ByteBuffer>> {

		volatile WriteSubscription subscription;
		volatile List<ByteBuffer> current;
		volatile boolean completed;
		final AsyncTriggerEvent startSubscription = new AsyncTriggerEvent(this::signalError, this::startSubscription);
		final WriteEvent writeEvent = new WriteEvent(channel, this);
		final Demand writeDemand = new Demand();

		@Override
		public void onSubscribe(Flow.Subscription subscription) {
			WriteSubscription previous = this.subscription;
			if (debug.on())
				debug.log("subscribed for writing");
			try {
				boolean needEvent = current == null;
				if (needEvent) {
					if (previous != null && previous.upstreamSubscription != subscription) {
						previous.dropSubscription();
					}
				}
				this.subscription = new WriteSubscription(subscription);
				if (needEvent) {
					if (debug.on())
						debug.log("write: registering startSubscription event");
					client.registerEvent(startSubscription);
				}
			} catch (Throwable t) {
				signalError(t);
			}
		}

		@Override
		public void onNext(List<ByteBuffer> bufs) {
			assert current == null : dbgString() // this is a queue of 1.
					+ "w.onNext current: " + current;
			assert subscription != null : dbgString() + "w.onNext: subscription is null";
			current = bufs;
			tryFlushCurrent(client.isSelectorThread()); // may be in selector thread
			// For instance in HTTP/2, a received SETTINGS frame might trigger
			// the sending of a SETTINGS frame in turn which might cause
			// onNext to be called from within the same selector thread that the
			// original SETTINGS frames arrived on. If rs is the read-subscriber
			// and ws is the write-subscriber then the following can occur:
			// ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
			// client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
			debugState("leaving w.onNext");
		}

		// Don't use a SequentialScheduler here: rely on onNext() being invoked
		// sequentially, and not being invoked if there is no demand, request(1).
		// onNext is usually called from within a user / executor thread.
		// Initial writing will be performed in that thread. If for some reason,
		// not all the data can be written, a writeEvent will be registered, and
		// writing will resume in the the selector manager thread when the
		// writeEvent is fired.
		//
		// If this method is invoked in the selector manager thread (because of
		// a writeEvent), then the executor will be used to invoke request(1),
		// ensuring that onNext() won't be invoked from within the selector
		// thread. If not in the selector manager thread, then request(1) is
		// invoked directly.
		void tryFlushCurrent(boolean inSelectorThread) {
			List<ByteBuffer> bufs = current;
			if (bufs == null)
				return;
			try {
				assert inSelectorThread == client.isSelectorThread() : "should " + (inSelectorThread ? "" : "not ") + " be in the selector thread";
				long remaining = Utils.remaining(bufs);
				if (debug.on())
					debug.log("trying to write: %d", remaining);
				long written = writeAvailable(bufs);
				if (debug.on())
					debug.log("wrote: %d", written);
				assert written >= 0 : "negative number of bytes written:" + written;
				assert written <= remaining;
				if (remaining - written == 0) {
					current = null;
					if (writeDemand.tryDecrement()) {
						Runnable requestMore = this::requestMore;
						if (inSelectorThread) {
							assert client.isSelectorThread();
							client.theExecutor().execute(requestMore);
						} else {
							assert !client.isSelectorThread();
							requestMore.run();
						}
					}
				} else {
					resumeWriteEvent(inSelectorThread);
				}
			} catch (Throwable t) {
				signalError(t);
			}
		}

		// Kick off the initial request:1 that will start the writing side.
		// Invoked in the selector manager thread.
		void startSubscription() {
			try {
				if (debug.on())
					debug.log("write: starting subscription");
				if (Log.channel()) {
					Log.logChannel("Start requesting bytes for writing to channel: {0}", channelDescr());
				}
				assert client.isSelectorThread();
				// make sure read registrations are handled before;
				readPublisher.subscriptionImpl.handlePending();
				if (debug.on())
					debug.log("write: offloading requestMore");
				// start writing;
				client.theExecutor().execute(this::requestMore);
			} catch (Throwable t) {
				signalError(t);
			}
		}

		void requestMore() {
			WriteSubscription subscription = this.subscription;
			subscription.requestMore();
		}

		@Override
		public void onError(Throwable throwable) {
			signalError(throwable);
		}

		@Override
		public void onComplete() {
			completed = true;
			// no need to pause the write event here: the write event will
			// be paused if there is nothing more to write.
			List<ByteBuffer> bufs = current;
			long remaining = bufs == null ? 0 : Utils.remaining(bufs);
			if (debug.on())
				debug.log("write completed, %d yet to send", remaining);
			debugState("InternalWriteSubscriber::onComplete");
		}

		void resumeWriteEvent(boolean inSelectorThread) {
			if (debug.on())
				debug.log("scheduling write event");
			resumeEvent(writeEvent, this::signalError);
		}

		void signalWritable() {
			if (debug.on())
				debug.log("channel is writable");
			tryFlushCurrent(true);
		}

		void signalError(Throwable error) {
			debug.log(() -> "write error: " + error);
			if (Log.channel()) {
				Log.logChannel("Failed to write to channel ({0}: {1})", channelDescr(), error);
			}
			completed = true;
			readPublisher.signalError(error);
			Flow.Subscription subscription = this.subscription;
			if (subscription != null)
				subscription.cancel();
		}

		// A repeatable WriteEvent which is paused after firing and can
		// be resumed if required - see SocketFlowEvent;
		final class WriteEvent extends SocketFlowEvent {
			final InternalWriteSubscriber sub;

			WriteEvent(SocketChannel channel, InternalWriteSubscriber sub) {
				super(SelectionKey.OP_WRITE, channel);
				this.sub = sub;
			}

			@Override
			protected final void signalEvent() {
				try {
					client.eventUpdated(this);
					sub.signalWritable();
				} catch (Throwable t) {
					sub.signalError(t);
				}
			}

			@Override
			protected void signalError(Throwable error) {
				sub.signalError(error);
			}

			@Override
			Logger debug() {
				return debug;
			}
		}

		final class WriteSubscription implements Flow.Subscription {
			final Flow.Subscription upstreamSubscription;
			volatile boolean cancelled;

			WriteSubscription(Flow.Subscription subscription) {
				this.upstreamSubscription = subscription;
			}

			@Override
			public void request(long n) {
				if (cancelled)
					return;
				upstreamSubscription.request(n);
			}

			@Override
			public void cancel() {
				if (cancelled)
					return;
				if (debug.on())
					debug.log("write: cancel");
				if (Log.channel()) {
					Log.logChannel("Cancelling write subscription");
				}
				dropSubscription();
				upstreamSubscription.cancel();
			}

			void dropSubscription() {
				synchronized (InternalWriteSubscriber.this) {
					cancelled = true;
					if (debug.on())
						debug.log("write: resetting demand to 0");
					writeDemand.reset();
				}
			}

			void requestMore() {
				try {
					if (completed || cancelled)
						return;
					boolean requestMore;
					long d;
					// don't fiddle with demand after cancel.
					// see dropSubscription.
					synchronized (InternalWriteSubscriber.this) {
						if (cancelled)
							return;
						d = writeDemand.get();
						requestMore = writeDemand.increaseIfFulfilled();
					}
					if (requestMore) {
						if (debug.on())
							debug.log("write: requesting more...");
						upstreamSubscription.request(1);
					} else {
						if (debug.on())
							debug.log("write: no need to request more: %d", d);
					}
				} catch (Throwable t) {
					if (debug.on())
						debug.log("write: error while requesting more: " + t);
					signalError(t);
				} finally {
					debugState("leaving requestMore: ");
				}
			}
		}
	}

	// ===================================================================== //
	// Reading //
	// ===================================================================== //

	// The InternalReadPublisher uses a SequentialScheduler to ensure that
	// onNext/onError/onComplete are called sequentially on the caller's
	// subscriber.
	// However, it relies on the fact that the only time where
	// runOrSchedule() is called from a user/executor thread is in signalError,
	// right after the errorRef has been set.
	// Because the sequential scheduler's task always checks for errors first,
	// and always terminate the scheduler on error, then it is safe to assume
	// that if it reaches the point where it reads from the channel, then
	// it is running in the SelectorManager thread. This is because all
	// other invocation of runOrSchedule() are triggered from within a
	// ReadEvent.
	//
	// When pausing/resuming the event, some shortcuts can then be taken
	// when we know we're running in the selector manager thread
	// (in that case there's no need to call client.eventUpdated(readEvent);
	//
	private final class InternalReadPublisher implements Flow.Publisher<List<ByteBuffer>> {
		private final InternalReadSubscription subscriptionImpl = new InternalReadSubscription();
		AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();
		private volatile ReadSubscription subscription;

		@Override
		public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
			Objects.requireNonNull(s);

			TubeSubscriber sub = FlowTube.asTubeSubscriber(s);
			ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
			ReadSubscription previous = pendingSubscription.getAndSet(target);

			if (previous != null && previous != target) {
				if (debug.on())
					debug.log("read publisher: dropping pending subscriber: " + previous.subscriber);
				previous.errorRef.compareAndSet(null, errorRef.get());
				previous.signalOnSubscribe();
				if (subscriptionImpl.completed) {
					previous.signalCompletion();
				} else {
					previous.subscriber.dropSubscription();
				}
			}

			if (debug.on())
				debug.log("read publisher got subscriber");
			subscriptionImpl.signalSubscribe();
			debugState("leaving read.subscribe: ");
		}

		void signalError(Throwable error) {
			if (debug.on())
				debug.log("error signalled " + error);
			if (!errorRef.compareAndSet(null, error)) {
				return;
			}
			if (Log.channel()) {
				Log.logChannel("Error signalled on channel {0}: {1}", channelDescr(), error);
			}
			subscriptionImpl.handleError();
		}

		final class ReadSubscription implements Flow.Subscription {
			final InternalReadSubscription impl;
			final TubeSubscriber subscriber;
			final AtomicReference<Throwable> errorRef = new AtomicReference<>();
			final BufferSource bufferSource;
			volatile boolean subscribed;
			volatile boolean cancelled;
			volatile boolean completed;

			public ReadSubscription(InternalReadSubscription impl, TubeSubscriber subscriber) {
				this.impl = impl;
				this.bufferSource = subscriber.supportsRecycling() ? new SSLDirectBufferSource(client) : SocketTube.this.sliceBuffersSource;
				this.subscriber = subscriber;
			}

			@Override
			public void cancel() {
				cancelled = true;
			}

			@Override
			public void request(long n) {
				if (!cancelled) {
					impl.request(n);
				} else {
					if (debug.on())
						debug.log("subscription cancelled, ignoring request %d", n);
				}
			}

			void signalCompletion() {
				assert subscribed || cancelled;
				if (completed || cancelled)
					return;
				synchronized (this) {
					if (completed)
						return;
					completed = true;
				}
				Throwable error = errorRef.get();
				if (error != null) {
					if (debug.on())
						debug.log("forwarding error to subscriber: " + error);
					subscriber.onError(error);
				} else {
					if (debug.on())
						debug.log("completing subscriber");
					subscriber.onComplete();
				}
			}

			void signalOnSubscribe() {
				if (subscribed || cancelled)
					return;
				synchronized (this) {
					if (subscribed || cancelled)
						return;
					subscribed = true;
				}
				subscriber.onSubscribe(this);
				if (debug.on())
					debug.log("onSubscribe called");
				if (errorRef.get() != null) {
					signalCompletion();
				}
			}
		}

		final class InternalReadSubscription implements Flow.Subscription {

			private final Demand demand = new Demand();
			final SequentialScheduler readScheduler;
			private volatile boolean completed;
			private final ReadEvent readEvent;
			private final AsyncEvent subscribeEvent;

			InternalReadSubscription() {
				readScheduler = new SequentialScheduler(new SocketFlowTask(this::read));
				subscribeEvent = new AsyncTriggerEvent(this::signalError, this::handleSubscribeEvent);
				readEvent = new ReadEvent(channel, this);
			}

			/*
			 * This method must be invoked before any other method of this class.
			 */
			final void signalSubscribe() {
				if (readScheduler.isStopped() || completed) {
					// if already completed or stopped we can handle any
					// pending connection directly from here.
					if (debug.on())
						debug.log("handling pending subscription while completed");
					handlePending();
				} else {
					try {
						if (debug.on())
							debug.log("registering subscribe event");
						client.registerEvent(subscribeEvent);
					} catch (Throwable t) {
						signalError(t);
						handlePending();
					}
				}
			}

			final void handleSubscribeEvent() {
				assert client.isSelectorThread();
				debug.log("subscribe event raised");
				if (Log.channel())
					Log.logChannel("Start reading from {0}", channelDescr());
				readScheduler.runOrSchedule();
				if (readScheduler.isStopped() || completed) {
					// if already completed or stopped we can handle any
					// pending connection directly from here.
					if (debug.on())
						debug.log("handling pending subscription when completed");
					handlePending();
				}
			}

			/*
			 * Although this method is thread-safe, the Reactive-Streams spec seems to not
			 * require it to be as such. It's a responsibility of the subscriber to signal
			 * demand in a thread-safe manner.
			 *
			 * See Reactive Streams specification, rules 2.7 and 3.4.
			 */
			@Override
			public final void request(long n) {
				if (n > 0L) {
					boolean wasFulfilled = demand.increase(n);
					if (wasFulfilled) {
						if (debug.on())
							debug.log("got some demand for reading");
						resumeReadEvent();
						// if demand has been changed from fulfilled
						// to unfulfilled register read event;
					}
				} else {
					signalError(new IllegalArgumentException("non-positive request"));
				}
				debugState("leaving request(" + n + "): ");
			}

			@Override
			public final void cancel() {
				pauseReadEvent();
				if (Log.channel()) {
					Log.logChannel("Read subscription cancelled for channel {0}", channelDescr());
				}
				readScheduler.stop();
			}

			private void resumeReadEvent() {
				if (debug.on())
					debug.log("resuming read event");
				resumeEvent(readEvent, this::signalError);
			}

			private void pauseReadEvent() {
				if (debug.on())
					debug.log("pausing read event");
				pauseEvent(readEvent, this::signalError);
			}

			final void handleError() {
				assert errorRef.get() != null;
				readScheduler.runOrSchedule();
			}

			final void signalError(Throwable error) {
				if (!errorRef.compareAndSet(null, error)) {
					return;
				}
				if (debug.on())
					debug.log("got read error: " + error);
				if (Log.channel()) {
					Log.logChannel("Read error signalled on channel {0}: {1}", channelDescr(), error);
				}
				readScheduler.runOrSchedule();
			}

			final void signalReadable() {
				readScheduler.runOrSchedule();
			}

			/** The body of the task that runs in SequentialScheduler. */
			final void read() {
				// It is important to only call pauseReadEvent() when stopping
				// the scheduler. The event is automatically paused before
				// firing, and trying to pause it again could cause a race
				// condition between this loop, which calls tryDecrementDemand(),
				// and the thread that calls request(n), which will try to resume
				// reading.
				try {
					while (!readScheduler.isStopped()) {
						if (completed)
							return;

						// make sure we have a subscriber
						if (handlePending()) {
							if (debug.on())
								debug.log("pending subscriber subscribed");
							return;
						}

						// If an error was signaled, we might not be in the
						// the selector thread, and that is OK, because we
						// will just call onError and return.
						ReadSubscription current = subscription;
						Throwable error = errorRef.get();
						if (current == null) {
							assert error != null;
							if (debug.on())
								debug.log("error raised before subscriber subscribed: %s", (Object) error);
							return;
						}
						TubeSubscriber subscriber = current.subscriber;
						if (error != null) {
							completed = true;
							// safe to pause here because we're finished anyway.
							pauseReadEvent();
							if (debug.on())
								debug.log("Sending error " + error + " to subscriber " + subscriber);
							if (Log.channel()) {
								Log.logChannel("Raising error with subscriber for {0}: {1}", channelDescr(), error);
							}
							current.errorRef.compareAndSet(null, error);
							current.signalCompletion();
							readScheduler.stop();
							debugState("leaving read() loop with error: ");
							return;
						}

						// If we reach here then we must be in the selector thread.
						assert client.isSelectorThread();
						if (demand.tryDecrement()) {
							// we have demand.
							try {
								List<ByteBuffer> bytes = readAvailable(current.bufferSource);
								if (bytes == EOF) {
									if (!completed) {
										if (debug.on())
											debug.log("got read EOF");
										if (Log.channel()) {
											Log.logChannel("EOF read from channel: {0}", channelDescr());
										}
										completed = true;
										// safe to pause here because we're finished
										// anyway.
										pauseReadEvent();
										current.signalCompletion();
										readScheduler.stop();
									}
									debugState("leaving read() loop after EOF: ");
									return;
								} else if (Utils.remaining(bytes) > 0) {
									// the subscriber is responsible for offloading
									// to another thread if needed.
									if (debug.on())
										debug.log("read bytes: " + Utils.remaining(bytes));
									assert !current.completed;
									subscriber.onNext(bytes);
									// we could continue looping until the demand
									// reaches 0. However, that would risk starving
									// other connections (bound to other socket
									// channels) - as other selected keys activated
									// by the selector manager thread might be
									// waiting for this event to terminate.
									// So resume the read event and return now...
									resumeReadEvent();
									debugState("leaving read() loop after onNext: ");
									return;
								} else {
									// nothing available!
									if (debug.on())
										debug.log("no more bytes available");
									// re-increment the demand and resume the read
									// event. This ensures that this loop is
									// executed again when the socket becomes
									// readable again.
									demand.increase(1);
									resumeReadEvent();
									debugState("leaving read() loop with no bytes");
									return;
								}
							} catch (Throwable x) {
								signalError(x);
								continue;
							}
						} else {
							if (debug.on())
								debug.log("no more demand for reading");
							// the event is paused just after firing, so it should
							// still be paused here, unless the demand was just
							// incremented from 0 to n, in which case, the
							// event will be resumed, causing this loop to be
							// invoked again when the socket becomes readable:
							// This is what we want.
							// Trying to pause the event here would actually
							// introduce a race condition between this loop and
							// request(n).
							debugState("leaving read() loop with no demand");
							break;
						}
					}
				} catch (Throwable t) {
					if (debug.on())
						debug.log("Unexpected exception in read loop", t);
					signalError(t);
				} finally {
					if (readScheduler.isStopped()) {
						if (debug.on())
							debug.log("Read scheduler stopped");
						if (Log.channel()) {
							Log.logChannel("Stopped reading from channel {0}", channelDescr());
						}
					}
					handlePending();
				}
			}

			boolean handlePending() {
				ReadSubscription pending = pendingSubscription.getAndSet(null);
				if (pending == null)
					return false;
				if (debug.on())
					debug.log("handling pending subscription for %s", pending.subscriber);
				ReadSubscription current = subscription;
				if (current != null && current != pending && !completed) {
					current.subscriber.dropSubscription();
				}
				if (debug.on())
					debug.log("read demand reset to 0");
				subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.
				pending.errorRef.compareAndSet(null, errorRef.get());
				if (!readScheduler.isStopped()) {
					subscription = pending;
				} else {
					if (debug.on())
						debug.log("socket tube is already stopped");
				}
				if (debug.on())
					debug.log("calling onSubscribe");
				pending.signalOnSubscribe();
				if (completed) {
					pending.errorRef.compareAndSet(null, errorRef.get());
					pending.signalCompletion();
				}
				return true;
			}
		}

		// A repeatable ReadEvent which is paused after firing and can
		// be resumed if required - see SocketFlowEvent;
		final class ReadEvent extends SocketFlowEvent {
			final InternalReadSubscription sub;

			ReadEvent(SocketChannel channel, InternalReadSubscription sub) {
				super(SelectionKey.OP_READ, channel);
				this.sub = sub;
			}

			@Override
			protected final void signalEvent() {
				try {
					client.eventUpdated(this);
					sub.signalReadable();
				} catch (Throwable t) {
					sub.signalError(t);
				}
			}

			@Override
			protected final void signalError(Throwable error) {
				sub.signalError(error);
			}

			@Override
			Logger debug() {
				return debug;
			}
		}
	}

	// ===================================================================== //
	// Buffer Management //
	// ===================================================================== //

	// This interface is used by readAvailable(BufferSource);
	public interface BufferSource {
		/**
		 * Returns a buffer to read data from the socket.
		 *
		 * @implNote Different implementation can have different strategies, as to which
		 *           kind of buffer to return, or whether to return the same buffer. The
		 *           only constraints are that: a. the buffer returned must not be null
		 *           b. the buffer position indicates where to start reading c. the
		 *           buffer limit indicates where to stop reading. d. the buffer is
		 *           'free' - that is - it is not used or retained by anybody else
		 *
		 * @return A buffer to read data from the socket.
		 */
		ByteBuffer getBuffer();

		/**
		 * Appends the read-data in {@code buffer} to the list of buffer to be sent
		 * downstream to the subscriber. May return a new list, or append to the given
		 * list.
		 *
		 * @implNote Different implementation can have different strategies, but must
		 *           obviously be consistent with the implementation of the getBuffer()
		 *           method. For instance, an implementation could decide to add the
		 *           buffer to the list and return a new buffer next time getBuffer() is
		 *           called, or could decide to add a buffer slice to the list and
		 *           return the same buffer (if remaining space is available) next time
		 *           getBuffer() is called.
		 *
		 * @param list   The list before adding the data. Can be null.
		 * @param buffer The buffer containing the data to add to the list.
		 * @param start  The start position at which data were read. The current buffer
		 *               position indicates the end.
		 * @return A possibly new list where a buffer containing the data read from the
		 *         socket has been added.
		 */
		List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buffer, int start);

		/**
		 * Returns the given unused {@code buffer}, previously obtained from
		 * {@code getBuffer}.
		 *
		 * @implNote This method can be used, if necessary, to return the unused buffer
		 *           to the pull.
		 *
		 * @param buffer The unused buffer.
		 */
		default void returnUnused(ByteBuffer buffer) {
		}
	}

	// An implementation of BufferSource used for unencrypted data.
	// This buffer source uses heap buffers and avoids wasting memory
	// by forwarding read-only buffer slices downstream.
	// Buffers allocated through this source are simply GC'ed when
	// they are no longer referenced.
	private static final class SliceBufferSource implements BufferSource {
		private final Supplier<ByteBuffer> factory;
		private volatile ByteBuffer current;

		@SuppressWarnings("unused")
		public SliceBufferSource() {
			this(Utils::getBuffer);
		}

		public SliceBufferSource(Supplier<ByteBuffer> factory) {
			this.factory = Objects.requireNonNull(factory);
		}

		// Reuses the same buffer if some space remains available.
		// Otherwise, returns a new heap buffer.
		@Override
		public final ByteBuffer getBuffer() {
			ByteBuffer buf = current;
			buf = (buf == null || !buf.hasRemaining()) ? (current = factory.get()) : buf;
			assert buf.hasRemaining();
			return buf;
		}

		// Adds a read-only slice to the list, potentially returning a
		// new list with that slice at the end.
		@Override
		public final List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buf, int start) {
			// creates a slice to add to the list
			int limit = buf.limit();
			buf.limit(buf.position());
			buf.position(start);
			ByteBuffer slice = buf.slice();

			// restore buffer state to what it was before creating the slice
			buf.position(buf.limit());
			buf.limit(limit);

			// add the buffer to the list
			return SocketTube.listOf(list, slice.asReadOnlyBuffer());
		}
	}

	// An implementation of BufferSource used for encrypted data.
	// This buffer source uses direct byte buffers that will be
	// recycled by the SocketTube subscriber.
	//
	private static final class SSLDirectBufferSource implements BufferSource {
		private final BufferSupplier factory;
		private final HttpClientImpl client;
		private ByteBuffer current;

		public SSLDirectBufferSource(HttpClientImpl client) {
			this.client = Objects.requireNonNull(client);
			this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());
		}

		// Obtains a 'free' byte buffer from the pool, or returns
		// the same buffer if nothing was read at the previous cycle.
		// The subscriber will be responsible for recycling this
		// buffer into the pool (see SSLFlowDelegate.Reader)
		@Override
		public final ByteBuffer getBuffer() {
			assert client.isSelectorThread();
			ByteBuffer buf = current;
			if (buf == null) {
				buf = current = factory.get();
			}
			assert buf.hasRemaining();
			assert buf.position() == 0;
			return buf;
		}

		// Adds the buffer to the list. The buffer will be later returned to the
		// pool by the subscriber (see SSLFlowDelegate.Reader).
		// The next buffer returned by getBuffer() will be obtained from the
		// pool. It might be the same buffer or another one.
		// Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because
		// recycling will happen in the flow before onNext returns, then the
		// pool can not grow larger than MAX_BUFFERS = 3 buffers, even though
		// it's shared by all SSL connections opened on that client.
		@Override
		public final List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buf, int start) {
			assert client.isSelectorThread();
			assert buf.isDirect();
			assert start == 0;
			assert current == buf;
			current = null;
			buf.limit(buf.position());
			buf.position(start);
			// add the buffer to the list
			return SocketTube.listOf(list, buf);
		}

		@Override
		public void returnUnused(ByteBuffer buffer) {
			// if current is null, then the buffer will have been added to the
			// list, through append. Otherwise, current is not null, and needs
			// to be returned to prevent the buffer supplier pool from growing
			// to more than MAX_BUFFERS.
			assert buffer == current;
			ByteBuffer buf = current;
			if (buf != null) {
				assert buf.position() == 0;
				current = null;
				// the supplier assert if buf has remaining
				buf.limit(buf.position());
				factory.recycle(buf);
			}
		}
	}

	// ===================================================================== //
	// Socket Channel Read/Write //
	// ===================================================================== //
	static final int MAX_BUFFERS = 3;
	static final List<ByteBuffer> EOF = Lists.of();
	static final List<ByteBuffer> NOTHING = Lists.of(Utils.EMPTY_BYTEBUFFER);

	// readAvailable() will read bytes into the 'current' ByteBuffer until
	// the ByteBuffer is full, or 0 or -1 (EOF) is returned by read().
	// When that happens, a slice of the data that has been read so far
	// is inserted into the returned buffer list, and if the current buffer
	// has remaining space, that space will be used to read more data when
	// the channel becomes readable again.
	private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {
		ByteBuffer buf = buffersSource.getBuffer();
		assert buf.hasRemaining();

		int read;
		int pos = buf.position();
		List<ByteBuffer> list = null;
		while (buf.hasRemaining()) {
			try {
				while ((read = channel.read(buf)) > 0) {
					if (!buf.hasRemaining())
						break;
				}
			} catch (IOException x) {
				if (buf.position() == pos && list == null) {
					// make sure that the buffer source will recycle
					// 'buf' if needed
					buffersSource.returnUnused(buf);
					// no bytes have been read, just throw...
					throw x;
				} else {
					// some bytes have been read, return them and fail next time
					errorRef.compareAndSet(null, x);
					read = 0; // ensures outer loop will exit
				}
			}

			// nothing read;
			if (buf.position() == pos) {
				// An empty list signals the end of data, and should only be
				// returned if read == -1. If some data has already been read,
				// then it must be returned. -1 will be returned next time
				// the caller attempts to read something.
				buffersSource.returnUnused(buf);
				if (list == null) {
					// nothing read - list was null - return EOF or NOTHING
					list = read == -1 ? EOF : NOTHING;
				}
				break;
			}

			// check whether this buffer has still some free space available.
			// if so, we will keep it for the next round.
			list = buffersSource.append(list, buf, pos);

			if (read <= 0 || list.size() == MAX_BUFFERS) {
				break;
			}

			buf = buffersSource.getBuffer();
			pos = buf.position();
			assert buf.hasRemaining();
		}
		return list;
	}

	private static <T> List<T> listOf(List<T> list, T item) {
		int size = list == null ? 0 : list.size();
		switch (size) {
			case 0:
				return Lists.of(item);
			case 1:
				return Lists.of(list.get(0), item);
			case 2:
				return Lists.of(list.get(0), list.get(1), item);
			default: // slow path if MAX_BUFFERS > 3
				List<T> res = list instanceof ArrayList ? list : new ArrayList<>(list);
				res.add(item);
				return res;
		}
	}

	private long writeAvailable(List<ByteBuffer> bytes) throws IOException {
		ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY);
		final long remaining = Utils.remaining(srcs);
		long written = 0;
		while (remaining > written) {
			try {
				long w = channel.write(srcs);
				assert w >= 0 : "negative number of bytes written:" + w;
				if (w == 0) {
					break;
				}
				written += w;
			} catch (IOException x) {
				if (written == 0) {
					// no bytes were written just throw
					throw x;
				} else {
					// return how many bytes were written, will fail next time
					break;
				}
			}
		}
		return written;
	}

	private void resumeEvent(SocketFlowEvent event, Consumer<Throwable> errorSignaler) {
		boolean registrationRequired;
		synchronized (lock) {
			registrationRequired = !event.registered();
			event.resume();
		}
		try {
			if (registrationRequired) {
				client.registerEvent(event);
			} else {
				client.eventUpdated(event);
			}
		} catch (Throwable t) {
			errorSignaler.accept(t);
		}
	}

	private void pauseEvent(SocketFlowEvent event, Consumer<Throwable> errorSignaler) {
		synchronized (lock) {
			event.pause();
		}
		try {
			client.eventUpdated(event);
		} catch (Throwable t) {
			errorSignaler.accept(t);
		}
	}

	@Override
	public void connectFlows(TubePublisher writePublisher, TubeSubscriber readSubscriber) {
		if (debug.on())
			debug.log("connecting flows");
		this.subscribe(readSubscriber);
		writePublisher.subscribe(this);
	}

	@Override
	public String toString() {
		return dbgString();
	}

	final String dbgString() {
		return "SocketTube(" + id + ")";
	}

	final String channelDescr() {
		return String.valueOf(channel);
	}
}